
#include <cerrno>
#include <thread>
#include "HEpoll.h"
#include "Socket.h"
#include "HoComm.h"
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <sys/timerfd.h>
#include <fcntl.h>
#include <functional>
#include <cstring>

#define MAX_EVENT_NUMBER 1024

int g_pipefd[2] = {INVALID_FD, INVALID_FD};

HEpoll::HEpoll( void ): epfd(INVALID_FD), newThread(false), exit(false)
{

}

HEpoll::~HEpoll( void )
{
    unInit();
}

int HEpoll::init( void ) // int port, const char* svrhost /* = NULL*/, int lqueue /*=100*/
{
    epfd = epoll_create(1024);
    return (INVALID_FD == epfd) ? -13 : 0;
}

int HEpoll::run( bool runInNewThread )
{
    int ret = 0;
    if (runInNewThread)
    {
        thread = std::thread(&HEpoll::_run, this);
        newThread = true;
    }
    else
    {
        ret = _run();
    }
    
    return ret;
}

int HEpoll::_run( void )
{
    int ret = 0;
    struct epoll_event backEvs[MAX_EVENT_NUMBER];

    while ( !exit )
    {
        int number = epoll_wait(epfd, backEvs, MAX_EVENT_NUMBER, -1);
        // LOGDEBUG("epoll wait return %d", number);
        if((number < 0) && (errno != EINTR)) break;

        for (int i = 0; i < number; i++)
        {
            /*** EPOLLIN=1 EPOLLOUT=4 EPOLLERR=8 EPOLLHUP=16 ***/
            int fd = backEvs[i].data.fd;
            // to do
            ConnectMetaSPtr ptr;
            {
                lock_guard<mutex> lock(mapMtx);
                auto itr = connMetaMap.find(fd);
                if (itr != connMetaMap.end())
                {
                    ptr = itr->second;
                }
            }
            
            if (ptr)
            {
                ret = ptr->onEpollEvent(backEvs[i].events);
            }
            else
            {
                epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
            }
        }
    }

    LOGINFO("ep thread exit");
    return ret;
}

void HEpoll::notifyExit( void )
{
    exit = true;
    addTimer(1, true, NULL);
}

void HEpoll::unInit( void )
{
    IFCLOSEFD(epfd);
    if (newThread)
    {
        thread.join();
        newThread = false;
    }
}

string HEpoll::toString()
{
    string str("epfd ");

    str.append(to_string(epfd)).append(" connSize is ")
       .append(to_string(connMetaMap.size()));
    if (mapMtx.try_lock())
    {
        mapMtx.unlock();
        str.append("U");
    }

    return str;
}

//////////////////////////////////////////////
// 按行(\n)拆分报文
void FuncSplitReqLine(queue<string>& reqQueue, string& stream, weak_ptr<ConnectMeta> connMeta)
{
    static const string sp = "\n";
    static const std::size_t splen = sp.size();
    if (!stream.empty())
    {
        unsigned cutCount = 0, loop = 0;
        // size_t strmSize = stream.size();
        size_t pos1, pos2;
        for (pos1=0; string::npos != (pos2 = stream.find(sp, pos1)); pos1=pos2+splen)
        {
            if (pos2 > pos1)
            {
                reqQueue.push(stream.substr(pos1, pos2-pos1));
                cutCount++;
            }
            loop++;
        }

        // LOGDEBUG("SplitLine: streamSize=%zu cut=%u/%u pos1=%zu pos2=%zu endCh=%d",
        //     strmSize, cutCount, loop, pos1, pos2, stream[strmSize-1]);
        if (pos1 > 0)
        {
            stream.erase(0, pos1);
        }
    }
}

// 请求到达处理
void FuncOnMessageReach(const string& msg, weak_ptr<ConnectMeta> connMeta)
{
    LOGDEBUG("recv msg: %s", msg.c_str());

}

// 有新连接进来
void HEpoll::FuncOnAccept(int cliFd, ConnectMetaSPtr listenMeta)
{
    auto newCliMeta = listenMeta->cloneNew(cliFd, FDS_ESTABLISH);

    {
        lock_guard<mutex> lock(mapMtx);
        this->connMetaMap[cliFd] = ConnectMetaSPtr(newCliMeta);
    }
    newCliMeta->addEvt(EPOLLIN | EPOLLET);
    LOGDEBUG("accept one fd=%d", cliFd);
}

// 连接发生关闭或出错
void HEpoll::FuncOnClose(int fd, const string& errMsg)
{
    bool errHappen = !errMsg.empty() && ACTIVE_STOP_REASON != errMsg;
    LOGOPT_EI(errHappen, "fd=%d connect end %s", 
        fd, errMsg.empty()? "normal close": errMsg.c_str());
}

void HEpoll::FuncOnRemoveRef(int fd, ConnectMetaSPtr connMeta)
{
    {
        lock_guard<mutex> lock(mapMtx);
        this->connMetaMap.erase(fd);
    }

    if (aliasNameRef) // 删除别名引用
    {
        aliasNameRef->delAlias(connMeta);
    }
}

void HEpoll::FuncOnTimer(int timerFd, bool once, function<void(int timerFd)> callBack)
{
    uint64_t rdmsg = 0;
    read(timerFd, &rdmsg, sizeof(rdmsg)); 
    LOGDEBUG("func cb x=%d, rdmsg=%u", timerFd, (unsigned)rdmsg);
    if (callBack)
    {
        callBack(timerFd);
    }

    if (once)
    {
        delTimer(timerFd);
    }
}

//////////////////////////////////////////////

ConnectMetaSPtr HEpoll::addTcpListen(int port)
{
    struct sockaddr_in ip4addr;
    ConnectMetaSPtr connMeta;

    bzero(&ip4addr, sizeof(ip4addr));
    ip4addr.sin_family = AF_INET;
    ip4addr.sin_port = htons(port); 
    ip4addr.sin_addr.s_addr = INADDR_ANY;
    // ret = inet_pton(PF_INET, ipaddr, (void*)&ip4addr.sin_addr.s_addr); // 在特定网络监听

    int sockFd = socket(AF_INET, SOCK_STREAM, 0);
    int reuseaddr = 1;
    if (INVALID_FD != sockFd)
    {
        setsockopt(sockFd, SOL_SOCKET, SO_REUSEADDR, (const void *)&reuseaddr, sizeof(int));
        int ret = bind(sockFd, (struct sockaddr*)&ip4addr, sizeof(struct sockaddr));
        if (0 == ret && 0 == ::listen(sockFd, LISTEN_BACKLOG))
        {
            fcntl(sockFd, F_SETFL, fcntl(sockFd, F_GETFL) | O_NONBLOCK);
            connMeta = addInitConnMeta(sockFd, FDS_INITIAL_LISTEN);
        }
    }

    return connMeta;
}

// please call start() later
ConnectMetaSPtr HEpoll::addTcpConnect(const string& dstHost, int port, int timeoutSec, bool noblock)
{
    int sockFd = INVALID_FD;
    ConnectMetaSPtr connMeta;
    int ret = Socket::connect(sockFd, dstHost.c_str(), port, timeoutSec, noblock);

    LOGDEBUG("add fd=%d to epoll, connect-ret=%d", sockFd, ret);
    if (0 == ret)
    {
        connMeta = addInitConnMeta(sockFd, FDS_ESTABLISH);
    }
    else if (ERRSOCK_AGAIN == ret)
    {
        connMeta = addInitConnMeta(sockFd, FDS_CONNECTING);
    }
    else
    {
        IFCLOSEFD(sockFd);
    }

    return connMeta;
}

ConnectMetaSPtr HEpoll::addTcpConnect2(const string& dstHost, int port, int timeoutSec, 
        function<void(bool,ConnectMetaSPtr)> connectResultCB)
{
    auto ptr = addTcpConnect(dstHost, port, timeoutSec, true);
    if (ptr)
    {
        ptr->onConnectResultFunc = connectResultCB;
    }

    return ptr;
}

ConnectMetaSPtr HEpoll::addInitConnMeta(int sockFd, FdState fdState)
{
    auto connMeta = make_shared<ConnectMeta>(epfd, sockFd, fdState);
    connMeta->splitReqFunc = FuncSplitReqLine;
    connMeta->onMessageReachFunc = FuncOnMessageReach;
    connMeta->onAcceptFunc = std::bind(&HEpoll::FuncOnAccept, this, placeholders::_1, placeholders::_2);
    connMeta->onCloseFunc = std::bind(&HEpoll::FuncOnClose, this, placeholders::_1, placeholders::_2);
    connMeta->_onRemoveRefFunc = std::bind(&HEpoll::FuncOnRemoveRef, this, placeholders::_1, placeholders::_2);

    lock_guard<mutex> lock(mapMtx);
    connMetaMap[sockFd] = connMeta;
    return connMeta;
}

int HEpoll::addTimer(int sec, bool once, function<void(int timerFd)> callBack)
{
    struct itimerspec itmer;

    itmer.it_value.tv_sec = sec;
    itmer.it_value.tv_nsec = 0;
    itmer.it_interval.tv_sec =  (once? 0: sec);
    itmer.it_interval.tv_nsec = 0;

    int tmfd = timerfd_create(CLOCK_MONOTONIC, O_NONBLOCK|TFD_CLOEXEC);
    if (tmfd < 0) {
        LOGERROR("timerfd_create error, Error:[%d:%s]", errno, strerror(errno));
        //errno = "timerfd_create error";
        return -1;
    }

    int ret = timerfd_settime(tmfd, 0, &itmer, NULL);
    if (ret < 0) {
        LOGERROR("timerfd_settime error, Error:[%d:%s]", errno, strerror(errno));
        close(tmfd);
        //errno = "timerfd_settime error";
        return -1;
    }

    auto connMeta = make_shared<ConnectMeta>(epfd, tmfd, FDS_TIMERFD);
    connMeta->onTimerFunc = bind(&HEpoll::FuncOnTimer, this, tmfd, once, callBack);
    connMeta->setEvt(EPOLLIN | EPOLLET);
    lock_guard<mutex> lock(mapMtx);
    connMetaMap[tmfd] = connMeta;

    return tmfd;
}

void HEpoll::delTimer(int timerFd)
{
    lock_guard<mutex> lock(mapMtx);
    auto itr = connMetaMap.find(timerFd);
    if (itr != connMetaMap.end())
    {
        itr->second->setEvt(0);
        connMetaMap.erase(itr);
    }
}

ConnectMetaSPtr HEpoll::find(int fd)
{
    lock_guard<mutex> lock(mapMtx);
    auto itr = connMetaMap.find(fd);
    return (itr != connMetaMap.end()) ? itr->second: NULL;
}

void HEpoll::setAliasNameInstance(shared_ptr<AliasRef<ConnectMetaSPtr>> ptr)
{
    aliasNameRef = ptr;
}

bool HEpoll::addAliasName(const string& aliasName, ConnectMetaSPtr connMeta)
{
    if (aliasNameRef)
    {
        return aliasNameRef->addAlias(aliasName, connMeta);
    }
    return false;
}

void HEpoll::delAliasName(const string& aliasName)
{
    if (aliasNameRef)
    {
        aliasNameRef->delAlias(aliasName);
    }
}

ConnectMetaSPtr HEpoll::find(const string& aliasName)
{
    return aliasNameRef? aliasNameRef->find(aliasName) : NULL;
}
